#include "include/CMqttClient.h"
#include "include/CGetHostByName.h"
/**
 *
 * Copyright(c), SuperHouse Co.,Ltd.
 * All rights reserved.
 *
 * @brief   mqtt 客服端封装
 * @note    -
 * @author  lth
 * @date    2021-12-18
 *
 */

BASENET_BEGIN_NAMESPACE

// mqtt 定时器
static void mqtt_timer_fn(void *arg)
{
	CMqttClient *p = (CMqttClient *)arg;
	if(p) {
		p->__timer();
	}
}

// mqtt 回调函数
static void mqtt_fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {
	CMqttClient *p = (CMqttClient *)fn_data;
	if(p){
		p->__mqttFn(c,ev,ev_data);
	}
}

//构造函数
CMqttClient::CMqttClient()
{
	m_bInit = false;
	m_bRunStatus = false;
	m_bConnestStatus = false;
	m_sMqttCon = NULL;
	m_iKeepLiveSendNum =0;
	m_iReConTime = 0;
	m_lPrevSecondRun = 0;
	m_pBack = NULL;
	memset(&m_sOpts,0x00,sizeof(struct mg_mqtt_opts));
	memset(&m_sTlsOpts,0x00,sizeof(struct mg_tls_opts));
	CGetHostByName::initMongoose();

}

// 析构函数
CMqttClient::~CMqttClient()
{
	stop();
}

// 启动MQTT客服端服务
bool CMqttClient::start()
{
	int topts = MG_TIMER_REPEAT | MG_TIMER_RUN_NOW;
	if(!m_bInit){
        qDebug("not init");
		return false;
	}
	if(m_bRunStatus) {
        qDebug("is run ...");
		return true;
	}
	m_bRunStatus = true;
	mg_mgr_init(&m_sMgr);
    mg_timer_init(&m_sMgr,&m_sTimer, m_iReConTime, topts, mqtt_timer_fn, this);
    QThread::start();
	return true;
}

// 停止MQTT客服端服务
void CMqttClient::stop()
{
	if(!m_bRunStatus){
		return;
	}

	m_bRunStatus = false;
    wait();
	mg_timer_free(&m_sMgr,&m_sTimer);
	mg_mgr_free(&m_sMgr);
}

// 运行线程
void CMqttClient::run()
{
	while(m_bRunStatus){
		mg_mgr_poll(&m_sMgr, 1000);
	}
}

// 定时器回调
void CMqttClient::__timer()
{
	if(!m_bRunStatus){
		return;
	}
	if (m_sMqttCon == NULL){
		m_sMqttCon = mg_mqtt_connect(&m_sMgr, m_sUrl.c_str(), &m_sOpts, mqtt_fn, this);
	}
}

// mqtt消息回调
void CMqttClient::__mqttFn(struct mg_connection *c, int ev, void *ev_data)
{
  if (ev == MG_EV_OPEN) {
     qDebug("MG_EV_OPEN");
  } else if (ev == MG_EV_ERROR) {
     qDebug("MG_EV_ERROR");
  } else if(ev == MG_EV_CLOSE){
     QMutexLocker lock(&m_sLock);
	 m_sMqttCon = NULL;
     qDebug("MG_EV_CLOSE");
	 m_bConnestStatus = false;
	 if(m_pBack && m_pBack->conStatus){
		 m_pBack->conStatus(m_bConnestStatus,m_pData);
	 }

  }else if (ev == MG_EV_CONNECT) {

	 if (mg_url_is_ssl(m_sUrl.c_str())) {
		 struct mg_str host = mg_url_host(m_sUrl.c_str());
		 if(m_sTlsOpts.srvname.len == 0){
			 m_sTlsOpts.srvname = host;
		 }
	     mg_tls_init(c, &m_sTlsOpts);
         qDebug("ssl MG_EV_CONNECT");
	 } else {
         qDebug("MG_EV_CONNECT");
	 }
  } else if (ev == MG_EV_MQTT_OPEN) {
    qDebug("MG_EV_MQTT_OPEN");
	m_bConnestStatus = true;
	if(m_pBack && m_pBack->conStatus){
		 m_pBack->conStatus(m_bConnestStatus,m_pData);
	}
	m_iKeepLiveSendNum = 0;
	m_lPrev_second = 0;
	m_sMqttCon = c;
	// 登录成功后的回调
  } else if (ev == MG_EV_MQTT_MSG) {
	// MQTT 消息
	struct mg_mqtt_message *mm = (struct mg_mqtt_message *) ev_data;
	if(m_pBack && m_pBack->publishData){
		string topics = string(mm->topic.ptr,(int) mm->topic.len);
		string msg = string(mm->data.ptr,(int) mm->data.len);
	    m_pBack->publishData(topics,msg,m_pData);
	}
  } else if(ev == MG_EV_MQTT_CMD){
	  struct mg_mqtt_message *pMqtt = (struct mg_mqtt_message *) ev_data;
	  if(m_pBack && m_pBack->mqttRecvCmd){
		  m_pBack->mqttRecvCmd(pMqtt,m_pData);
	  }
	  switch (pMqtt->cmd) {
		   case MQTT_CMD_PINGREQ:
			   mg_mqtt_pong(c);
			 break;
		   case MQTT_CMD_PINGRESP:
			   m_iKeepLiveSendNum = 0;
			   break;
	  }
	  mg_mqtt_pubAck(c,pMqtt);
  }

  else if (ev == MG_EV_POLL) {
	  unsigned long second = (*(unsigned long *) ev_data) / 1000;
	  if(m_lPrevSecondRun == second){
		  return;
	  }
	  m_lPrevSecondRun = second;
	  // 心跳检测
	  if(m_bConnestStatus){
		  if(m_lPrev_second == 0) {
			  m_lPrev_second = second;
		  }
		  if(m_sOpts.keepalive > 0){
			  if(abs((long)(second - m_lPrev_second)) >= m_sOpts.keepalive)
			  {
				  mg_mqtt_ping(c);
				  m_iKeepLiveSendNum++;
				  m_lPrev_second = second;
			  }
		  }
		  if(m_iKeepLiveSendNum >= 3){
			  c->is_closing = 1;
              qDebug("mqtt ping time out %d",m_iKeepLiveSendNum);
			  m_iKeepLiveSendNum = 0;

		  }
	  }
	  if(m_pBack && m_pBack->secondTime){
	  		  m_pBack->secondTime(second,m_pData);
	  }
  }
}

// 停止MQTT客服端服务
void CMqttClient::init(string url,struct mg_mqtt_opts *sOpts,struct mg_tls_opts *sTlsOpts,int reconnectionTime)
{
	m_sUrl = url;

	if(sOpts){
		 memcpy(&m_sOpts,sOpts,sizeof(struct mg_mqtt_opts));
		 if(sOpts->client_id.ptr){
			 m_tOpts.client_id = string(sOpts->client_id.ptr,sOpts->client_id.len);
			 m_sOpts.client_id = mg_str( m_tOpts.client_id.c_str());
		 }

		 if(sOpts->user.ptr){
			 m_tOpts.user = string(sOpts->user.ptr,sOpts->user.len);
			 m_sOpts.user = mg_str( m_tOpts.user.c_str());
		 }

		 if(sOpts->pass.ptr){
			 m_tOpts.pass = string(sOpts->pass.ptr,sOpts->pass.len);
			 m_sOpts.pass = mg_str( m_tOpts.pass.c_str());
		 }

		 if(sOpts->will_topic.ptr){
			 m_tOpts.will_topic = string(sOpts->will_topic.ptr,sOpts->will_topic.len);
			 m_sOpts.will_topic = mg_str( m_tOpts.will_topic.c_str());
		 }

		 if(sOpts->will_message.ptr){
			 m_tOpts.will_message = string(sOpts->will_message.ptr,sOpts->will_message.len);
			 m_sOpts.will_message = mg_str( m_tOpts.will_message.c_str());
		 }
	}
	if(sTlsOpts){
		 memcpy(&m_sTlsOpts,sTlsOpts,sizeof(struct mg_tls_opts));
		 if(sTlsOpts->ca){
			 m_tTlsOpts.ca = sTlsOpts->ca;
			 m_sTlsOpts.ca =  m_tTlsOpts.ca.c_str();
		 }
		 if(sTlsOpts->crl){
			 m_tTlsOpts.crl = sTlsOpts->crl;
			 m_sTlsOpts.crl =  m_tTlsOpts.crl.c_str();
		 }
		 if(sTlsOpts->cert){
			 m_tTlsOpts.cert = sTlsOpts->cert;
			 m_sTlsOpts.cert =  m_tTlsOpts.cert.c_str();
		 }
		 if(sTlsOpts->certkey){
			 m_tTlsOpts.certkey = sTlsOpts->certkey;
			 m_sTlsOpts.certkey =  m_tTlsOpts.certkey.c_str();
		 }
		 if(sTlsOpts->ciphers){
			 m_tTlsOpts.ciphers = sTlsOpts->ciphers;
			 m_sTlsOpts.ciphers =  m_tTlsOpts.ciphers.c_str();
		 }
		 if(sTlsOpts->srvname.ptr){
			 m_tTlsOpts.srvname = string(sTlsOpts->srvname.ptr,sTlsOpts->srvname.len);
			 m_sTlsOpts.srvname =  mg_str( m_tTlsOpts.srvname.c_str());
		 }
	}
	m_iReConTime = reconnectionTime;
	m_bInit = true;
}

// 订阅
uint16_t CMqttClient::subscribes(const char *topics,int qos)
{
	struct mg_str str =mg_str(topics);
    QMutexLocker lock(&m_sLock);
	if(m_bConnestStatus && m_sMqttCon){
		 return mg_mqtt_sub(m_sMqttCon,&str,qos);
	}
	return 0;
}

uint16_t CMqttClient::unSubscribes(char **topics,size_t topics_len)
{
    QMutexLocker lock(&m_sLock);
	if(m_bConnestStatus && m_sMqttCon){
		 return mg_mqtt_unsub(m_sMqttCon,topics,topics_len);
	}
	return 0;
}

// 发布消息
uint16_t  CMqttClient::publish(const char *topics,string &data,int qos,bool retain)
{
	struct mg_str str_topic =mg_str(topics);
	struct mg_str str_msg =mg_str(data.c_str());
    QMutexLocker lock(&m_sLock);
	if(m_bConnestStatus && m_sMqttCon){
		m_lPrev_second = 0;
		if(qos > 0){
			return mg_mqtt_pub(m_sMqttCon,&str_topic,&str_msg,qos,retain);
		}
		mg_mqtt_pub(m_sMqttCon,&str_topic,&str_msg,qos,retain);
		return 1;
	}
	return 0;
}

// 设置mqtt回调
void CMqttClient::setBack(tsMqttCb *back,void *data)
{
	m_pData = data;
	m_pBack = back;
}


BASENET_END_NAMESPACE




